package cn.doitedu.rtdw.data_etl;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/4/10
 * @Desc: 学大数据，到多易教育
 **/
public class Etl07_AdviceEventsAnalyse {

    public static void main(String[] args) {
        // 编程入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setParallelism(1);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // 创建kafka连接器表，来映射 kafka 中的dwd层的用户行为公共维度宽表
        tenv.executeSql(
                "  CREATE TABLE dwd_kafka (                           "
                        + "     user_id           BIGINT,                     "
                        + "     event_id          string,                     "
                        + "     event_time        bigint,                     "
                        + "     device_type       string,                     "
                        + "     page_type         string,                     "
                        + "     properties        map<string,string>,         "
                        + "     pt AS proctime() ,                            "
                        + "     rt AS to_timestamp_ltz(event_time,3),         "
                        + "     WATERMARK FOR rt AS rt - interval '0' second  "
                        + " ) WITH (                                          "
                        + "  'connector' = 'kafka',                           "
                        + "  'topic' = 'dwd-events-detail',                   "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092', "
                        + "  'properties.group.id' = 'testGroup',             "
                        + "  'scan.startup.mode' = 'latest-offset',           "
                        + "  'value.format'='json',                           "
                        + "  'value.json.fail-on-missing-field'='false',      "
                        + "  'value.fields-include' = 'EXCEPT_KEY')           ");

        /**
         * hbase中广告信息维表
         */
        tenv.executeSql(
                "CREATE TABLE ad_hbase ( " +
                        " creative_id STRING, " +
                        " f ROW<creative_name STRING,ad_id STRING,ad_name STRING,ad_farther STRING,ad_campain STRING>, " +
                        " PRIMARY KEY (creative_id) NOT ENFORCED " +
                        ") WITH (                             " +
                        " 'connector' = 'hbase-2.2',          " +
                        " 'table-name' = 'dim_ad_info',     " +
                        " 'zookeeper.quorum' = 'doitedu:2181' " +
                        ")");


        /**
         * 先聚合：
         * 按照  人+广告创意 为粒度，聚合：展示次数、点击次数
         */
        tenv.executeSql(
                " CREATE TEMPORARY VIEW tmp_view AS                               "
                        + " WITH tmp AS (                                          "
                        + " SELECT                                                 "
                        + "   user_id,                                             "
                        + "   event_id,                                            "
                        + "   event_time,                                          "
                        + "   properties['creative_id'] as creative_id,            "
                        + "   rt,                                                  "
                        + "   pt                                                   "
                        + " from dwd_kafka                                         "
                        + " where event_id in ('ad_show','ad_click')               "
                        + " )                                                      "
                        + "                                                        "
                        + " SELECT                                                 "
                        + "    max(pt) as pt,                                      "
                        + "    user_id,                                            "
                        + "    creative_id,                                        "
                        + "    sum(if(event_id='ad_show',1,0)) as show_cnt,        "
                        + "    sum(if(event_id='ad_click',1,0)) as click_cnt       "
                        + " FROM TABLE(                                            "
                        + "  TUMBLE(TABLE tmp,DESCRIPTOR(rt),INTERVAL '5' MINUTE)  "
                        + " )                                                      "
                        + " GROUP BY                                               "
                        + "   window_start,                                        "
                        + "   window_end,                                          "
                        + "   user_id,                                             "
                        + "   creative_id                                          "
        );

        /**
         * +----+----------------------+--------------------------------+-------------+-------------+
         * | op |              user_id |                    creative_id |    show_cnt |   click_cnt |
         * +----+----------------------+--------------------------------+-------------+-------------+
         * | +I |                    3 |                    creative001 |           1 |           0 |
         * | +I |                    3 |                    creative002 |           2 |           1 |
         * | +I |                    5 |                    creative001 |           1 |           0 |
         * | +I |                    5 |                    creative003 |           1 |           1 |
         * | +I |                    3 |                    creative004 |           1 |           1 |
         * | +I |                    3 |                    creative003 |           2 |           0 |
         *
         */


        /**
         * 创建doris连接器表，映射最终物理结果表
         */
        tenv.executeSql(
                " create table ad_doris (             "
                        + "    user_id    bigint,                                         "
                        + "    creative_id  string,                                     "
                        + "    creative_name  string,                                   "
                        + "    ad_id  string,                                           "
                        + "    ad_name  string,                                         "
                        + "    ad_farther string,                                      "
                        + "    ad_campain string,                                      "
                        + "    show_cnt bigint,                                        "
                        + "    click_cnt bigint                                       "
                        + " ) WITH (                               "
                        + "    'connector' = 'doris',              "
                        + "    'fenodes' = 'doitedu:8030',         "
                        + "    'table.identifier' = 'dws.ad_ana_agg',  "
                        + "    'username' = 'root',                "
                        + "    'password' = '',                    "
                        + "    'sink.label-prefix' = 'doris_tl" + System.currentTimeMillis() + "')"
        );


        /**
         * 将上述的聚合结果，去关联广告创意相关的维度信息
         */
        tenv.executeSql(
                " INSERT INTO  ad_doris                                          "
                        + " SELECT                                               "
                        + "   v.user_id,                                         "
                        + "   v.creative_id,                                     "
                        + "   d.creative_name,                                   "
                        + "   d.ad_id,                                           "
                        + "   d.ad_name,                                         "
                        + "   d.ad_farther,                                      "
                        + "   d.ad_campain,                                      "
                        + "   v.show_cnt,                                        "
                        + "   v.click_cnt                                        "
                        + " FROM tmp_view v                                      "
                        + " LEFT JOIN ad_hbase FOR SYSTEM_TIME AS OF v.pt AS d   "
                        + "   ON v.creative_id = d.creative_id                   "
        );


        /*
         *
         * +---------+-------------+----------------------+-------+--------------------------------+--------------+-----------------+----------+-----------+
         * | user_id | creative_id | creative_name        | ad_id | ad_name                        | ad_farther   | ad_campain      | show_cnt | click_cnt |
         * +---------+-------------+----------------------+-------+--------------------------------+--------------+-----------------+----------+-----------+
         * |       3 | creative001 | fashion-tech-dynamic | ad01  | 多易教育大数据培训班           | 多易教育     | 暑期大放价      |        1 |         0 |
         * |       3 | creative002 | static-pic           | ad01  | 多易教育大数据培训班           | 多易教育     | 暑期大放价      |        2 |         1 |
         * |       3 | creative003 | hearbeat-text        | ad02  | 多易教育高端总裁班             | 多易教育     | 暑期大放价      |        2 |         0 |
         * |       3 | creative004 | video-superime       | ad02  | 多易教育高端总裁班             | 多易教育     | 暑期大放价      |        1 |         1 |
         * |       5 | creative001 | fashion-tech-dynamic | ad01  | 多易教育大数据培训班           | 多易教育     | 暑期大放价      |        1 |         0 |
         * |       5 | creative003 | hearbeat-text        | ad02  | 多易教育高端总裁班             | 多易教育     | 暑期大放价      |        1 |         1 |
         * +---------+-------------+----------------------+-------+--------------------------------+--------------+-----------------+----------+-----------+
         *
         */


    }
}
